﻿using DbSync.Provider;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DbSync
{
    public class DbSyncCore
    {
        //循环执行
        public static bool IsLoop = true;
        //缓存目录
        static string CachePath = "cache";
        //同步流程
        //1.结构同步
        //表,字段,索引
        //2.数据同步
        //数据同步算法(分页查询,有缺陷)
        public static void StartSync()
        {
            //配置信息 结构同步,数据同步,同步方式:1 全量同步,2 增量同步
            //string _Source = "Data Source=127.0.0.1;Database=xunyou_client;User Id=root;Password=root;Convert Zero Datetime=True;Allow Zero Datetime=True;charset=utf8;Min Pool Size=10;Max Pool Size=512;";
            //string _Target = "Data Source=127.0.0.1;Database=xunyou_client_ss;User Id=root;Password=root;Convert Zero Datetime=True;Allow Zero Datetime=True;charset=utf8;Min Pool Size=10;Max Pool Size=512;";
            //清理缓存,清理更新时间大于1天的缓存文件
            //ClearCache(1);
            //读取配置文件
            int loopIndex = 0;
            string configPath = AppDomain.CurrentDomain.BaseDirectory + "config.json";
            if (File.Exists(configPath))
            {               
                while (IsLoop)
                {
                    loopIndex++;
                    string ConfigJsonString = string.Empty;// File.ReadAllText(configPath, Encoding.UTF8).Replace("\r\n", "");
                    var cstrs = File.ReadAllLines(configPath);
                    foreach(var str in cstrs)
                    {
                        if (!str.Trim().StartsWith("//"))
                        {
                            ConfigJsonString += str.Replace("\r\n", "");
                        }
                    }                    
                    AppConfig config = JsonHelper.JsonToObj<AppConfig>(ConfigJsonString);
                    Console.Title = "数据同步" + (string.IsNullOrEmpty(config.Title) ? "" : "-" + config.Title);
                    List <Task> tasks = new List<Task>();
                    foreach (var item in config.SyncItems)
                    {
                        //启动时全量同步
                        if (item.IsStartRest&& loopIndex==1)
                        {
                            ClearIndexCache(item.Index);
                        }
                        //存在执行计划
                        if (item.ExecuteCroe != null&& item.ExecuteCroe.Count == 2)
                        {
                            if (DateTime.Now < Convert.ToDateTime(item.ExecuteCroe[0]) || DateTime.Now >= Convert.ToDateTime(item.ExecuteCroe[1]))
                            {
                                WriteLog(item.Index+" 计划等待中...", ConsoleColor.Yellow);
                                continue;
                            }
                        }
                        tasks.Add(Task.Run(() =>
                        {
                            WriteLog("正在执行同步["+ loopIndex + "] ...");
                            DataSync(item);
                            WriteLog("同步完成[" + loopIndex + "] !");
                        }));
                    }
                    if (tasks.Count > 0)
                    {
                        Task.WaitAll(tasks.ToArray());
                    }                       
                    Thread.Sleep(1000 * 10);
                }              
            }
            else
            {
                WriteLog("配置文件不存在...");
            }
        }

        /// <summary>
        /// 同步
        /// </summary>
        /// <param name="source"></param>
        /// <param name="target"></param>
        static void DataSync(SyncItem config)
        {
            List<string> IncludeTable = string.IsNullOrEmpty(config.IncludeTable) ? new List<string>() : config.IncludeTable.Split(',').ToList();
            List<string> IgnoreTable = string.IsNullOrEmpty(config.IgnoreTable) ? new List<string>() : config.IgnoreTable.Split(',').ToList();
            int _SyncType = config.SyncType;
            IDbProxy source = DbProxy.GetProviderFactory(config.Source);
            if (!source.ConnectStatus)
            {
                WriteLog("源数据库连接失败!", ConsoleColor.Red);
                return;
            }
            List<string> sourceTables = source.GetTables();
            List<IDbProxy> targetPools = new List<IDbProxy>();
            var targetindex = 0;           
            foreach (var titem in config.Target)
            {
                targetindex++;
                IDbProxy target = DbProxy.GetProviderFactory(titem);
                if (!target.ConnectStatus)
                {
                    WriteLog("目标数据库[" + targetindex + "]连接失败!", ConsoleColor.Red);
                }
                else
                {
                    //var target_tables = target.GetTables();
                    targetPools.Add(target);
                }
            }
            if (config.UseStructureSync)
            {
                StartStructureSync(targetPools, source, sourceTables, IncludeTable, IgnoreTable,config.Index);
            }

            if (config.UseDataSync)
            {
                StartDataSync(targetPools, source, sourceTables, IncludeTable, IgnoreTable, config.Index, config.OrderBy);
            }
            
        }

        //缓存当前同步的位置
        static KeyValuePair<int,string> GetPageindex(string index, string tablename)
        {
            var value = new KeyValuePair<int, string>();
            string cache_path = CachePath + "/" + index + "/" + tablename + ".tmp";
            string loacl_cache_path = AppDomain.CurrentDomain.BaseDirectory + cache_path;
            if (File.Exists(loacl_cache_path))
            {
                try
                {
                    var text = File.ReadAllText(loacl_cache_path);
                    var sp = text.Split(new char[] { '|' }).ToList();
                    value = new KeyValuePair<int, string>(int.Parse(sp[0]), sp[1]);
                    return value;
                }
                catch
                {
                    return value;
                }
            }
            else
            {
                return value;
            }
        }
        static bool SetPageindex(string index, string tablename, int pindex,string orderValue)
        {
            var dir = CachePath + "/" + index;
            string cache_path = dir + "/" + tablename + ".tmp";
            string loacl_cache_path = AppDomain.CurrentDomain.BaseDirectory + cache_path;
            try
            {
                if (!Directory.Exists(dir))
                {
                    Directory.CreateDirectory(dir);
                }
                File.WriteAllText(loacl_cache_path, pindex.ToString()+"|"+ orderValue);
                return true;
            }
            catch (Exception ex)
            {
                return false;
            }
        }
        static bool ClearPageindex(string index, string tablename)
        {
            string cache_path = CachePath + "/" + index + "/" + tablename + ".tmp";
            string loacl_cache_path = AppDomain.CurrentDomain.BaseDirectory + cache_path;
            try
            {
                if (File.Exists(loacl_cache_path))
                {
                    File.Delete(loacl_cache_path);
                }
                return true;
            }
            catch (Exception ex)
            {
                return false;
            }
        }
        static bool ClearIndexCache(string syncIndex)
        {
            string cache_path = CachePath+"/"+ syncIndex;
            string loacl_cache_path = AppDomain.CurrentDomain.BaseDirectory + cache_path;
            try
            {
                if (Directory.Exists(loacl_cache_path))
                {
                    var files = Directory.GetFiles(loacl_cache_path);
                    foreach (var file in files)
                    {
                        FileInfo fi = new FileInfo(file);
                        
                            //清理
                            fi.Delete();
                        
                    }
                }
                return true;
            }
            catch (Exception ex)
            {
                return false;
            }
        }

        static void StartStructureSync(List<IDbProxy> targetPools, IDbProxy source,List<string> sourceTables,List<string> IncludeTable, List<string> IgnoreTable, string syncIndex) {
            Dictionary<IDbProxy, List<string>> targetWithTables = new Dictionary<IDbProxy, List<string>>();
            List<string> needAddTables = new List<string>();
            List<string> needModifyTables = new List<string>();
            targetPools.ForEach((item) =>
            {
                var targetTables = item.GetTables();
                needAddTables.Union(sourceTables.Except(targetTables));
                needModifyTables.Union(sourceTables.Union(targetTables));
                targetWithTables.Add(item, targetTables);
            });
            Dictionary<string, string> targetWithCreateSql = new Dictionary<string, string>();
            needAddTables.ForEach((item) =>
            {
                targetWithCreateSql.Add(item, source.GetCreateTableInfo(item));
            });
            //Dictionary<string, Dictionary<string, string>> targetWithColumns = new Dictionary<string, Dictionary<string, string>>();
            //needModifyTables.ForEach((item) =>
            //{
            //    targetWithColumns.Add(item, source.GetTableStructure(item));
            //});
            foreach (var table in sourceTables)
            {
                if (IncludeTable.Count > 0 && !IncludeTable.Contains(table))
                {
                    continue;
                }
                if (IgnoreTable.Count > 0 && IgnoreTable.Contains(table))
                {
                    continue;
                }
                //结构同步                   
                var Columns = source.GetTableStructure(table);
                var Indexs = source.GetIndexMap(table);
                var targetIndex = 0;
                foreach (var target_item in targetWithTables)
                {
                    targetIndex++;
                       //表同步
                       var target = target_item.Key;
                    var targetTables = target_item.Value;
                    if (!targetTables.Contains(table))
                    {
                        //添加表  
                        if (!targetWithCreateSql.ContainsKey(table))
                        {
                            targetWithCreateSql.Add(table, source.GetCreateTableInfo(table));
                        }
                        if (target.CreateTable(targetWithCreateSql[table]))
                        {
                            targetTables.Add(table);
                            ClearPageindex(syncIndex, table);
                            //WriteLog(table+" ["+ targetIndex + "]添加Ok!", ConsoleColor.Green);
                        }
                        else
                        {
                            WriteLog(table + " [" + targetIndex + "]添加Fail!", ConsoleColor.Red);
                        }
                       
                    }
                    DoTableStructureSync(table, Columns, Indexs, target, syncIndex, targetIndex);
                }
            }
            WriteLog("["+ syncIndex + "]结构同步完成!", ConsoleColor.Green);
        }
        static void StartDataSync(List<IDbProxy> targetPools, IDbProxy source, List<string> sourceTables, List<string> IncludeTable, List<string> IgnoreTable,string syncIndex,string orderby)
        {
            var pageSize = 200;
            Dictionary<IDbProxy, List<string>> targetWithTables = new Dictionary<IDbProxy, List<string>>();
            targetPools.ForEach((item) =>
            {
                var targetTables = item.GetTables();
                targetWithTables.Add(item, targetTables);
            });
            List<Task> tasks = new List<Task>();
            foreach (var table in sourceTables)
            {      
                if (IncludeTable.Count > 0 && !IncludeTable.Contains(table))
                {
                    continue;
                }
                if (IgnoreTable.Count > 0 && IgnoreTable.Contains(table))
                {
                    continue;
                }
                var tableKey = "";
                var auto_increment = "";
                var sourceColumns = source.GetTableStructure(table, out tableKey,out auto_increment);
                if (string.IsNullOrEmpty(tableKey))
                {
                    WriteLog(table + " 缺少主键,自动跳过数据同步!", ConsoleColor.Yellow);
                    continue;
                }
                //数据同步               
                var thistask = Task.Factory.StartNew(new Action<object>(cols =>
                {
                    var Columns = cols as Dictionary<string, string>;
                    //排序字段
                    string orderKey = "";
                    if (!string.IsNullOrEmpty(orderby))
                    {
                        var ody = orderby.Split(new char[] { ',' }).ToList();
                        foreach (var item in ody)
                        {
                            if (Columns.Keys.Contains(item))
                            {
                                orderKey = item;
                                break;
                            }
                        }
                    }
                    //读取缓存记录
                    var pageIndex = GetPageindex(syncIndex, table);
                    var pIndex = pageIndex.Key;
                    //设置最大排序值
                    var maxOrderBy = pageIndex.Value;
                    var tableorderby = new KeyValuePair<string, string>("*","*");
                    if (!string.IsNullOrEmpty(orderKey))
                    {
                        tableorderby = new KeyValuePair<string, string>(orderKey, pageIndex.Value);
                    }
                    else
                    {
                        if (tableKey == auto_increment) {
                            tableorderby = new KeyValuePair<string, string>(tableKey, pageIndex.Value);
                            orderKey = tableKey;
                        }
                    }
                    int tryLimit = 9;
                    int tryCount = 0;
                    while (true)
                    {
                        var sourceData = source.GetTableDataByPage(table, pIndex, pageSize, tableKey, "extraid", tableorderby);
                        if (sourceData != null)
                        {
                            if (sourceData.Count <= 0)
                            {
                                //终止
                                //WriteLog(table + " ["+ syncIndex + "]同步已完成!", ConsoleColor.Green);
                                SetPageindex(syncIndex, table, 0, maxOrderBy);
                                break;
                            }
                            else
                            {
                                bool needResetIndex = false;
                                //更新排序信息
                                if (!string.IsNullOrEmpty(orderKey))
                                {
                                    var this_maxOrderBy = sourceData.Last()[orderKey];
                                    if (this_maxOrderBy != maxOrderBy)
                                    {
                                        needResetIndex = true;
                                        maxOrderBy = this_maxOrderBy;
                                        tableorderby = new KeyValuePair<string, string>(orderKey, maxOrderBy);
                                    }
                                }

                                //数据同步进度
                                var dataIndex = pIndex + "*" + pageSize + "[" + tableorderby.Key + "/" + tableorderby.Value + "]";
                                var targetIndex = 0;
                                foreach (var targetitem in targetWithTables)
                                {
                                    targetIndex++;
                                    if (targetitem.Value.Contains(table))
                                    {
                                        //同步子数据库
                                        var target = targetitem.Key;
                                        var result = DoTableDataSync(table, tableKey, sourceData, Columns, target, targetIndex, dataIndex);
                                    }
                                }

                                //记录当前索引位置                              
                                SetPageindex(syncIndex, table, needResetIndex ? 0 : pIndex++, maxOrderBy);
                            }
                        }
                        else
                        {
                            if (tryCount > tryLimit)
                            {
                                WriteLog(table + " [" + syncIndex + "]同步未完成[" + tryCount + "]!", ConsoleColor.Red);
                                SetPageindex(syncIndex, table, 0, maxOrderBy);
                                break;
                            }
                            Thread.Sleep(1000 * 2);
                            tryCount++;
                        }
                    }
                }), sourceColumns);
                tasks.Add(thistask);           
            }
            Task.WaitAll(tasks.ToArray());
            WriteLog("[" + syncIndex + "]数据同步完成!", ConsoleColor.Green);
        }
        static bool DoTableStructureSync(string table, Dictionary<string, string> sourceColumnMap, Dictionary<string, string> sourceIndexMap, IDbProxy target, string syncIndex, int targetIndex = 1,  int SyncType = 1)
        {
            Dictionary<string, string> targetColumnMap = target.GetTableStructure(table);
            if (SyncType == 1)
            {
                foreach (var column in targetColumnMap)
                {
                    //删字段
                    if (!sourceColumnMap.ContainsKey(column.Key))
                    {
                        //target.Execute("alter table " + table + " drop " + column.Key);
                        if (target.DeleteTableColumn(table,column) > 0)
                        {
                            ClearPageindex(syncIndex, table);
                        }
                        WriteLog(table + " 删除列" + column.Key);
                    }
                }
            }

            foreach (var column in sourceColumnMap)
            {
                //加字段
                if (!targetColumnMap.ContainsKey(column.Key))
                {
                    //var sql = "alter table " + table + " add " + column.Value;
                    //target.Execute("alter table " + table + " add " + column.Value);
                    if(target.AddTableColumn(table, column)>0)
                    {
                        ClearPageindex(syncIndex, table);
                    }
                    ClearPageindex("", table);
                    WriteLog(table + " 新增列" + column.Key);
                }
                //改字段
                else if (targetColumnMap[column.Key] != column.Value)
                {
                    target.ModifyTableColumn(table, column);
                    WriteLog(table + " 修改列" + column.Key);
                }
            }

            Dictionary<string, string> targetIndexMap = target.GetIndexMap(table);            
            foreach (var index in sourceIndexMap)
            {
                if(!targetIndexMap.ContainsKey(index.Key))
                {
                    //添加
                    target.AddTableIndex(table, index);
                    WriteLog(table + " 添加索引" + index.Key);
                }
                else
                {
                    if(targetIndexMap[index.Key] != index.Value)
                    {
                        //修改
                        target.ModifyTableIndex(table, index);
                        WriteLog(table + " 修改索引" + index.Key);
                    }
                }
                
            }
            if (SyncType == 1)
            {
                foreach (var index in targetIndexMap)
                {
                    //删索引
                    if (!sourceIndexMap.ContainsKey(index.Key))
                    {
                        target.DeleteTableIndex(table, index);
                        WriteLog(table + " 删除索引" + index.Key);
                    }
                }
            }
            

            return true;
        }
        static bool DoTableDataSync(string table,string key, List<Dictionary<string, string>> sourceData, Dictionary<string, string> Columns, IDbProxy target,int targetIndex=1,string dataIndex="", int SyncType = 1)
        {
            //增,删,改
            //int[] result = new int[] { 0, 0, 0 };
            //var key = "";
            Dictionary<string, string> targetColumns = target.GetTableStructure(table);
            List<string> keyidnexList = new List<string>();
            foreach (var item in sourceData)
            {
                var keyIndex = item[key];
                keyidnexList.Add("'" + keyIndex + "'");
            }
            var targetData = target.GetTableDataByKeys(table, key, keyidnexList);
            List<string> tableFieldList = Columns.Select(k => "`" + k + "`" + "=" + "values(" + "`" + k + "`" + ")").ToList();
            List<Dictionary<string, string>> needUpdateValueList = new List<Dictionary<string, string>>();
            List<Dictionary<string, string>> needInsertValueList = new List<Dictionary<string, string>>();
            foreach (var item in sourceData)
            {

                var targetItem = targetData.Where(t => t[key] == item[key]).FirstOrDefault();
                if (targetItem == null)
                {
                    //插入数据                         
                    needInsertValueList.Add(item);
                }
                else
                {
                    if (!DbProxy.DicCompare(item, targetItem))
                    {
                        //更新
                        needUpdateValueList.Add(item);
                    }
                }
            }
            if (needInsertValueList.Count > 0)
            {
               var  result = target.InsertData(table, needInsertValueList);
                 WriteLog(table + " ["+targetIndex+"]新增" + needInsertValueList.Count + "/" + result + "条!,进度 "+dataIndex );
            }
            if (needUpdateValueList.Count > 0)
            {
                var result = target.UpdateData(table, needUpdateValueList);
                
                WriteLog(table + " ["+ targetIndex + "]更新" + needUpdateValueList.Count + "/" + result + "条!,进度" + dataIndex);
            }
            return true;
        }
        static void WriteLog(string text, ConsoleColor color = ConsoleColor.White)
        {
            Console.ForegroundColor = color;
            Console.WriteLine(DateTime.Now + ": " + text);
        }
    }
}
